package org.moon.framework.autoconfigure.rabbitmq.customer;

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.moon.framework.autoconfigure.exception.domain.MoonException;
import org.moon.framework.autoconfigure.rabbitmq.config.MqttConfig;
import org.moon.framework.autoconfigure.rabbitmq.constant.MqttConstants;
import org.moon.framework.autoconfigure.rabbitmq.customer.annotation.MqttReceive;
import org.moon.framework.autoconfigure.config.SpringContextConfig;
import org.moon.framework.autoconfigure.rabbitmq.MqttAutoConfiguration;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;

/**
 * 消费消息
 * @author ninglong
 * @Date 2020/11/5 14:31
 */
@Configuration
@IntegrationComponentScan
@ConditionalOnProperty(value="moon.mqtt.enabled",havingValue = "true")
@AutoConfigureBefore(MqttAutoConfiguration.class)
@Slf4j
public class MqttInboundConfiguration {

    @Autowired
    private MqttConfig mqttConfig;

    private static Map<String,IMqttReceive> receiveMap = Maps.newHashMap();

    /**
     * MQTT消费端订阅通道（消息入站）
     *
     * @return {@link MessageChannel}
     */
    @Bean(name = MqttConstants.CHANNEL_NAME_IN)
    public MessageChannel inboundChannel() {
        return new DirectChannel();
    }


    /**
     * MQTT消费端连接配置（消息入站）
     * @param channel {@link MessageChannel}
     * @param factory {@link MqttPahoClientFactory}
     * @return {@link MessageProducer}
     */
    @Bean
    public MessageProducer inbound(
            @Qualifier(MqttConstants.CHANNEL_NAME_IN) MessageChannel channel,
            @Qualifier(MqttConstants.FACTORY_NAME) MqttPahoClientFactory factory) {
        String [] topicNames = mqttConfig.getTopicNames().split(",");
        String [] qos = mqttConfig.getConsumerQos().split(",");
        if(qos.length>1&&qos.length!=topicNames.length){
            throw new MoonException("设置多个qos时，qos的数量要与topicName的数量一致");
        }
        // 可以同时消费（订阅）多个Topic
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
                mqttConfig.getClientId() + "_customer"+System.currentTimeMillis(), factory, topicNames);
        // 设置操作完成的超时时长，默认30000毫秒
        adapter.setCompletionTimeout(mqttConfig.getCompletionTimeout());
        // 配置默认消息转换器(qos=0, retain=false, charset=UTF-8)
        adapter.setConverter(new DefaultPahoMessageConverter());
        // 0 至多一次，数据可能丢失
        // 1 至少一次，数据可能重复
        // 2 只有一次，且仅有一次，最耗性能
        adapter.setQos(Arrays.stream(qos).mapToInt(Integer::parseInt).toArray());
        // 设置订阅通道
        adapter.setOutputChannel(channel);
        return adapter;
    }

    /**
     * MQTT消费端消息处理器（消息入站）
     *
     * @return {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = MqttConstants.CHANNEL_NAME_IN)
    public MessageHandler inboundHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String curTopicName = message.getHeaders().get("mqtt_receivedTopic").toString();
                IMqttReceive receive = receiveMap.get(curTopicName);
                if(receive==null){//第一次为空，则去加载所有的消息实现类
                    receive = loadMqttReceive(curTopicName);
                }
                if(receive==null){
                    throw new MoonException("消息主题【"+curTopicName+"】没有对应的消息接收实现");
                }
                receive.handlerMessage(message.getPayload().toString());
            }
        };
    }

    private IMqttReceive loadMqttReceive(String curTopicName){
        IMqttReceive curReceive = null;
        Map<String, Object> map = SpringContextConfig.getApplicationContext().getBeansWithAnnotation(MqttReceive.class);
        Iterator<Map.Entry<String,Object>> it = map.entrySet().iterator();
        while (it.hasNext()){
            Map.Entry<String,Object> entry = it.next();
            IMqttReceive receive = (IMqttReceive)entry.getValue();
            Class<?> aClass = AopUtils.getTargetClass(entry.getValue());
            MqttReceive annotation = aClass.getAnnotation(MqttReceive.class);//获取注解详情
            String topicName = annotation.topicName();
            receiveMap.put(topicName,receive);
            if(curTopicName.equalsIgnoreCase(topicName)){
                curReceive = receive;
            }
        }
        return curReceive;
    }
}
